Skip to content

feature: Implement saving operations to / reading from the on-disk storage #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 7, 2025

Conversation

PatrykGala
Copy link
Collaborator

Before submitting checklist

  • Did you update the CHANGELOG? (not for test updates, internal changes/refactors or CI/CD setup)
  • Did you ask the docs owner to review all the user-facing changes?

Base automatically changed from pg/run-repository to dev/persistent-queue February 27, 2025 15:42
@PatrykGala PatrykGala changed the title Pg/persistent mode feature: persistent mode Feb 27, 2025
yield operation, create_run.sequence_id, create_run.timestamp

if not operations:
yield None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield is not like return, 305-307 will still be executed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. I've added tests for stram_attributes.

@property
def last_timestamp(self) -> Optional[float]:
with self._lock:
return self._last_timestamp
Copy link
Contributor

@michalsosn michalsosn Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should keeping last_sequence_id and last_timestamp be responsibility of AttributeStore?
You are passing AttributeStore to LagTracker just to read this timestamp, even though it's not interested in attribute values.
And you are manually updating the timestamp after creating a Run even though it has nothing to do with attribute values

It seem to belong more to the OperationsRepository itself or maybe to some class in the middle
.. <-> OperationsService <-> OperationsRepository

Copy link
Collaborator Author

@PatrykGala PatrykGala Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, adding a class with the last timestamp and last sequence will be fine. I considered moving the 'create run' functionality to AttributeStore, but it's not a good solution.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PatrykGala PatrykGala marked this pull request as ready for review February 28, 2025 15:24
@PatrykGala PatrykGala force-pushed the pg/persistent-mode branch from b1e64bb to fca2e4c Compare March 3, 2025 18:04
@PatrykGala PatrykGala changed the title feature: persistent mode feature: Implement saving operations to / reading from the on-disk storage Mar 4, 2025
@PatrykGala PatrykGala force-pushed the pg/persistent-mode branch from e4a4f03 to 07ff04d Compare March 4, 2025 12:53
@@ -57,7 +57,7 @@ def __init__(
metrics: Optional[Metrics],
add_tags: Optional[dict[str, Union[list[str], set[str], tuple[str]]]],
remove_tags: Optional[dict[str, Union[list[str], set[str], tuple[str]]]],
max_message_bytes_size: int = 1024 * 1024,
max_message_bytes_size: int = 8 * 1024 * 1024, # 8 MB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 8? Don't we have 16 in the API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set to 2MB

@@ -692,3 +711,8 @@ def print_message(msg: str, *args: Any, last_print: Optional[float] = None, verb
return current_time

return last_print


def _create_repository_path(project: str, run_id: str) -> Path:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: ideally we need to come up with something unambiguous, but usable (url encoding is not usable). Need to think some more.

yield operation, create_run.sequence_id, create_run.ts

if not operations:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some backoff here - 0.1s sleep for now.

Copy link
Collaborator Author

@PatrykGala PatrykGala Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use Daemon thread:

class Daemon(threading.Thread):

With sleep time: SYNC_THREAD_SLEEP_TIME = 0.5, if list is empty, loop in work finish and thread go to sleep.

@@ -464,7 +343,32 @@ def _raise_exception(status_code: int) -> None:
raise NeptuneUnexpectedResponseError()


class StatusTrackingThread(Daemon, WithResources):
def _merge_operations(operations: list[Operation]) -> tuple[UpdateRunSnapshots, SequenceId, datetime.datetime]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this should be inlined.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

logger.debug("Submitting operation #%d with size of %d bytes", sequence_id, len(data))
run_operation = RunOperation()
run_operation.ParseFromString(data)
logger.debug("Submitting operation #%d with size of %d bytes", sequence_id, len(""))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either remove or fill with something sensible (ByteSize is probably too heavy, but maybe snapshot count / create run count, or something?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add if for debug level:

if logger.isEnabledFor(logging.DEBUG):
   logger.debug(
                            "Submitting operation #%d with size of %d bytes", sequence_id, run_operation.ByteSize()
                        )

assert tracker.last_sequence_id == 10
assert tracker.last_timestamp == 123.456

# Update with a lower sequence ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without lock in attribute store: yes.
We can add lock here:

        operations = list(splitter)
        sequence_id = self._operations_repo.save_update_run_snapshots(operations)

        self._sequence_tracker.update_sequence_id(sequence_id)

@@ -0,0 +1,167 @@
import time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite against this entire test. We're testing some internal implementation instead of behavior.

What we should do instead is write a unit test for sync process mocking the repository and the backend client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -242,6 +259,9 @@ def get_metadata(self) -> Optional[Metadata]:

version, project, run_id, parent_run_id, fork_step = row

if version != DB_VERSION:
raise NeptuneLocalStorageInUnsupportedVersion()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move the validation to constructor or dedicated method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment elsewhere - let's remove it from here.

@@ -280,12 +279,15 @@ def _check_for_run_conflicts(
) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should become something like _validate_existing_db() and within it there should be:

  1. Check if the version matches. If not -> NeptuneLocalStorageInUnsupportedVersion
  2. Check if all other metadata matches. If not -> NeptuneConflictingDataInLocalStorage; if it does -> warning.

@@ -242,6 +259,9 @@ def get_metadata(self) -> Optional[Metadata]:

version, project, run_id, parent_run_id, fork_step = row

if version != DB_VERSION:
raise NeptuneLocalStorageInUnsupportedVersion()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment elsewhere - let's remove it from here.

@PatrykGala PatrykGala merged commit 768e192 into dev/persistent-queue Mar 7, 2025
9 checks passed
@PatrykGala PatrykGala deleted the pg/persistent-mode branch March 7, 2025 09:00
github-merge-queue bot pushed a commit that referenced this pull request Mar 12, 2025
* feat: Implement OperationsRepository

* feat: Add test

* feat: Add test

* feat: Review

* feat: Add dataclass for metadata

* feat: Add dataclass for metadata - precommit

* feat: Move init db

* fix: Refactor OperationsRepository: replace window function. (#150)

* fix: Refactor OperationsRepository with performance improvements: replace window function.

* feature: Implement saving operations to / reading from the on-disk storage (#149)

feature: Implement saving operations to / reading from the on-disk storage

* feat: add sync cli

* reuse SyncProces internals in sync cli

* remove sync-no-parent flag

* implement sync using SyncProcess

* Add progress bar

* Add tests

* change tqdm version range

* feat: add offline mode

Disable unnecessary components in disabled mode

* add tests

* Allow providing base directory for log files

This can be done using env var `NEPTUNE_LOG_DIRECTORY`,
or a new Run constructor argument `log_directory`.

* `OperationsRepository` requires an absolute db path

* Adjust Run log_directory arg. tests for absolute/relative paths

* Apply suggestions from code review

Simpler tests and path resolve logic, don't verify log_directory being not empty

Co-authored-by: Piotr Łusakowski <piotr.lusakowskI@neptune.ai>

* fix errors after merge

* add e2e tests

* feat: delete sqlite files after exit (#154)

* feat: delete sqlite files after exit

* Ignore test_cleanup_repository_conflict

---------

Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai>

* Set NEPTUNE_PROJECT in e2e tests

* When SyncProcess crashes the Run remains uninterrupted

* Give some time for background processing after killing the sync process in tests

* feat: delete operations on ack instead of send (#156)

* feat: delete operations on ack instead of send

* add tests

* add one last test

* fix code style

---------

Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai>

* Add timestamp to operations repository path (#157)

* feat: add timestamp to operations repository path

* fix: convert arg to Path in click command

* remove db validate

* Remove fork info from operations db

* sleep in tests

---------

Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai>

---------

Co-authored-by: Patryk Gala <patryk.gala@neptune.ai>
Co-authored-by: Michał Sośnicki <michal.sosnicki@neptune.ai>
Co-authored-by: Krzysztof Godlewski <krzysztof.godlewski@neptune.ai>
Co-authored-by: Piotr Łusakowski <piotr.lusakowskI@neptune.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants